
#include "config.h"

#include <pthread.h>
#include <sys/time.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBYLIB

#include "ylock.h"
#include "sysutil.h"
#include "schedule.h"
#include "plock.h"
#include "types.h"
#include "ypage.h"

#include "lsv_volume_proto.h"
#include "lsv_volume.h"
#include "lsv_bitmap.h"
#include "lsv_bitmap_internal.h"
#include "lsv_help.h"
#include "lsv_bitmap_hash_cache.h"

#define CACHE_CHANAL
#define CACHE_PAGE_SIZE 4096

typedef struct {
        struct list_head                hook;
        uint64_t                        page_idx;
        plock_t                         lock;
        atomic_t                        counter;
        void                            *page_data;
        lsv_hash_cache_context_t        *hash_cache;
} lsv_cache_entry_t;

static uint32_t lsv_hash_cache_key_func(const void *k)
{
        const uint64_t *idx = k;
        return (*idx) % 1048576;
}

static int lsv_hash_cache_cmp_func(const void *s1, const void *s2)
{
        const lsv_cache_entry_t *e1 = s1;
        const uint64_t *idx = s2;

        if( e1->page_idx < *idx)
                return -1;
        else if (e1->page_idx > *idx)
                return 1;

        return 0;
}

static inline void lsv_hash_cache_free_func(void *arg)
{
        lsv_cache_entry_t *e = arg;
        lsv_hash_cache_context_t *hc = e->hash_cache;

        xfree(e->page_data);
        plock_destroy(&e->lock);
        atomic_dec(&hc->page_count);

        yfree((void **)&e);
}

static inline void page_idx_to_chunks(uint64_t page_idx, uint32_t *chunk_id, uint32_t *chunk_off)
{
        uint32_t _chunk_id = (uint32_t)((page_idx * CACHE_PAGE_SIZE)  / LSV_CHUNK_SIZE);
        uint32_t _chunk_off = (uint32_t)((page_idx * CACHE_PAGE_SIZE)  % LSV_CHUNK_SIZE);

        *chunk_id = _chunk_id;
        *chunk_off = _chunk_off;
}

static inline uint64_t chunk_to_page_idx(uint32_t chunk_id, uint32_t chunk_off)
{
        return ((uint64_t)chunk_id * LSV_CHUNK_SIZE + chunk_off) / CACHE_PAGE_SIZE;
}

static inline int lsv_hash_cache_malloc_func(lsv_hash_cache_context_t *hc, lsv_cache_entry_t **e, uint64_t page_idx)
{
        int ret;
        lsv_cache_entry_t *_e;
        char *pname = NULL;

        *e = NULL;

        _e = xmalloc(sizeof(lsv_cache_entry_t));
        if (unlikely(!_e)) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        plock_init(&_e->lock, pname);
        atomic_set(&_e->counter, 0);
        //INIT_LIST_HEAD(&_e->hook);

        _e->hash_cache = hc;
        _e->page_idx = page_idx;

        uint32_t chunk_id;
        uint32_t chunk_off;

        page_idx_to_chunks(page_idx, &chunk_id, &chunk_off);

        _e->page_data = xmalloc(CACHE_PAGE_SIZE);
        if (unlikely(!_e->page_data)) {
                ret = ENOMEM;
                YASSERT(0);
                GOTO(err_free, ret);
        }

        ret = lsv_bitmap_read_chunk(hc->volume_context, 0, chunk_id, chunk_off, CACHE_PAGE_SIZE, _e->page_data);
        if (unlikely(ret))
                GOTO(err_free2, ret);

        list_add_tail(&_e->hook, &hc->cache_list);

        atomic_inc(&hc->page_count);
        if(atomic_read(&hc->page_count) > LSV_PAGE_SIZE * 1024) {       //todo it.
                lsv_cache_entry_t *e2;
                lsv_cache_entry_t *first = list_entry(hc->cache_list.next, lsv_cache_entry_t, hook);

                list_del((struct list_head *)first);
                ret = hash_table_remove(hc->ht, (const void *)&first->page_idx, (void **)&e2);

                YASSERT(first->page_idx != page_idx);
                YASSERT(!ret);
                YASSERT(e2 == first);

                lsv_hash_cache_free_func(e2);
        }

        *e = _e;
        return 0;
err_free2:
        yfree((void **)&_e->page_data);
err_free:
        yfree((void **)&_e);
err_ret:
        return ret;

}

int lsv_hash_cache_init(lsv_hash_cache_context_t *hc, const char *name, void *volume_context)
{
        hc->ht = hash_create_table(lsv_hash_cache_cmp_func, lsv_hash_cache_key_func, name);
        plock_init(&hc->lock, name);
        plock_init(&hc->list_lock, name);
        atomic_set(&hc->page_count, 0);
        hc->volume_context = volume_context;

        INIT_LIST_HEAD(&hc->cache_list);

        return 0;
}

void lsv_hash_cache_release(lsv_hash_cache_context_t *hc)
{
        struct list_head *pos;
        struct list_head *n;

        list_for_each_safe(pos, n, &hc->cache_list) {
                lsv_cache_entry_t *ent = (lsv_cache_entry_t *)pos;

                list_del(&ent->hook);
                lsv_hash_cache_free_func(ent);
        }
}

void lsv_hash_cache_destroy(lsv_hash_cache_context_t *hc)
{
        plock_destroy(&hc->lock);
        plock_destroy(&hc->list_lock);

        hash_destroy_table(hc->ht, NULL);
}

int lsv_hash_cache_read_page_raw(lsv_hash_cache_context_t *hc, uint32_t chunk_id, uint32_t chunk_off, uint32_t length, void *data)
{
        uint32_t new_chunk_off;
        uint32_t new_length;
        int ret = 0;

        range_align32(CACHE_PAGE_SIZE, chunk_off, length, &new_chunk_off, &new_length);

        void *buf = malloc(new_length);

        ret |= lsv_bitmap_read_chunk(hc->volume_context, 0, chunk_id, new_chunk_off, new_length, buf);

        memcpy(data, buf + (chunk_off - new_chunk_off), length);

        free(buf);
        assert(ret == 0);
        return ret;
}

int lsv_hash_cache_update_page_raw(lsv_hash_cache_context_t *hc, uint32_t chunk_id, uint32_t chunk_off, uint32_t length, void *data)
{
        uint32_t new_chunk_off;
        uint32_t new_length;
        int ret = 0;

        range_align32(CACHE_PAGE_SIZE, chunk_off, length, &new_chunk_off, &new_length);

        void *buf = malloc(new_length);

        ret |= lsv_bitmap_read_chunk(hc->volume_context, 0, chunk_id, new_chunk_off, new_length, buf);

        memcpy(buf + (chunk_off - new_chunk_off), data, length);
        ret |= lsv_bitmap_write_chunk(hc->volume_context, chunk_id, new_chunk_off, new_length, buf);
        free(buf);
        assert(ret == 0);
        return ret;
}

int lsv_hash_cache_get_page(lsv_hash_cache_context_t *hc, uint64_t page_idx, uint32_t page_off, uint32_t length, void *data)
{
        int ret;
        atomic_t w;
        lsv_cache_entry_t *e;

        atomic_set(&w, 0);
again:

        if(atomic_read(&w))
                ret = plock_wrlock(&hc->lock);
        else
                ret = plock_rdlock(&hc->lock);

        if(unlikely(ret))
                goto err;

        e = hash_table_find(hc->ht, (void *)&page_idx);
        if(e) {
                ret = plock_wrlock(&hc->list_lock);
                if(!ret) {
                        atomic_inc(&e->counter);
                        if(atomic_read(&e->counter) > 10) {
                                atomic_set(&e->counter, 0);
                                list_move_tail(&hc->cache_list, &e->hook);
                        }
                        plock_unlock(&hc->list_lock);
                }

                assert(e->page_idx == page_idx);
                memcpy(data, e->page_data + page_off, length);
        }
        else if(!atomic_read(&w))
        {
                atomic_set(&w, 1);
                plock_unlock(&hc->lock);

                goto again;
        }
        else {  //wlock.
                ret = lsv_hash_cache_malloc_func(hc, &e, page_idx);
                if (unlikely(ret)) {
                        DERROR("allocate cache page error, ret = %d\r\n", ret);
                        goto err;
                }

                ret = hash_table_insert(hc->ht, (void *)e, &e->page_idx, 0);
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                assert(e->page_idx == page_idx);
                memcpy(data, e->page_data + page_off, length);
        }

err:
        plock_unlock(&hc->lock);
        return ret;
}

static inline int lsv_hash_cache_paged_get(lsv_hash_cache_context_t *hc, uint64_t start_page, uint32_t page_off, uint32_t length, void *data)
{
        int ret = 0;
        uint32_t off = 0;
        
        while(length)
        {
                int len = min(length, (CACHE_PAGE_SIZE - page_off));
                ret |= lsv_hash_cache_get_page(hc, start_page, page_off, len, (uint8_t *)data + off);

                page_off += len;

                if(page_off >= CACHE_PAGE_SIZE)
                {
                        page_off -= CACHE_PAGE_SIZE;
                        start_page ++;
                }

                off += len;
                length -= len;
        }

        return ret;
}

int lsv_hash_cache_get(lsv_hash_cache_context_t *hc, uint32_t chunk_id, uint32_t chunk_off, uint32_t length, void *data)
{
        //return lsv_hash_cache_read_page_raw(hc, chunk_id, chunk_off, length, data);

        return lsv_hash_cache_paged_get(hc, ((uint64_t)chunk_id * LSV_CHUNK_SIZE + chunk_off) / CACHE_PAGE_SIZE, chunk_off % CACHE_PAGE_SIZE, length, data);
}

int lsv_hash_cache_update_page(lsv_hash_cache_context_t *hc, uint64_t page_idx, uint32_t page_off, uint32_t length, void *data)
{
        int ret;
        lsv_cache_entry_t *e;
        uint32_t chunk_id;
        uint32_t chunk_off;

        ret = plock_wrlock(&hc->lock);
        if(unlikely(ret))
                goto err;

        e = hash_table_find(hc->ht, (void *)&page_idx);
        if(e) {
                ret = plock_wrlock(&hc->list_lock);
                if(!ret) {
                        atomic_inc(&e->counter);
                        if(atomic_read(&e->counter) > 10) {
                                atomic_set(&e->counter, 0);
                                list_move_tail(&hc->cache_list, &e->hook);
                        }
                        plock_unlock(&hc->list_lock);
                }

                assert(e->page_idx == page_idx);
                memcpy(e->page_data + page_off, data, length);
        }
        else {
                ret = lsv_hash_cache_malloc_func(hc, &e, page_idx);
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                ret = hash_table_insert(hc->ht, (void *)e, &e->page_idx, 0);
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                assert(e->page_idx == page_idx);
                memcpy(e->page_data + page_off, data, length);
        }

        page_idx_to_chunks(page_idx, &chunk_id, &chunk_off);
        assert(chunk_id);
        ret = lsv_bitmap_write_chunk(hc->volume_context, chunk_id, chunk_off, LSV_PAGE_SIZE, e->page_data);     //todo, rollback after failed.
        //assert(ret == 0);
err:
        plock_unlock(&hc->lock);
        return ret;
}

static inline int lsv_hash_cache_paged_update(lsv_hash_cache_context_t *hc, uint64_t start_page, uint32_t page_off, uint32_t length, void *data)
{
        int ret = 0;
        uint32_t off = 0;
        
        while(length)
        {
                int len = min(length, (CACHE_PAGE_SIZE - page_off));
                ret |= lsv_hash_cache_update_page(hc, start_page, page_off, len, (uint8_t *)data + off);

                page_off += len;

                if(page_off >= CACHE_PAGE_SIZE)
                {
                        page_off -= CACHE_PAGE_SIZE;
                        start_page ++;
                }

                off += len;
                length -= len;
        }

        return ret;
}

int lsv_hash_cache_update(lsv_hash_cache_context_t *hc, uint32_t chunk_id, uint32_t chunk_off, uint32_t length, void *data)
{
        //return lsv_hash_cache_update_page_raw(hc, chunk_id, chunk_off, length, data);

       return lsv_hash_cache_paged_update(hc, ((uint64_t)chunk_id * LSV_CHUNK_SIZE + chunk_off) / CACHE_PAGE_SIZE, chunk_off % CACHE_PAGE_SIZE, length, data);
}

/*
int lsv_hash_cache_allocate_chunk(lsv_hash_cache_context_t *hc)
{
        uint32_t chunk_id;

        int ret = lsv_bitmap_alloc_chunk(hc->volume_context, &chunk_id, 0);
        if(ret) {
                //fatal error.
                return ret;
        }

        return 0;
}
*/